#!/usr/bin/python3

# This file is part of Cockpit.
#
# Copyright (C) 2015 Red Hat, Inc.
#
# Cockpit is free software; you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 2.1 of the License, or
# (at your option) any later version.
#
# Cockpit is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Cockpit; If not, see <http://www.gnu.org/licenses/>.

import parent
from storagelib import *
from testlib import *


storaged_debug_service = """
[Unit]
Description=Storaged
Documentation=man:storaged(8)

[Service]
Type=dbus
BusName=org.storaged.Storaged
ExecStart=/usr/lib/storaged/storaged
"""


class TestStorageMounting(StorageCase):

    def testMounting(self):
        m = self.machine
        b = self.browser

        mount_point_foo = "/run/foo"
        mount_point_bar = "/run/bar"

        self.login_and_go("/storage")

        # Add a disk
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        b.click('#drives tr:contains("MYDISK")')
        b.wait_present('#storage-detail')

        # Format it

        self.content_head_action(1, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("name", "FILESYSTEM")
        self.dialog_set_val("mount_point", "")
        self.dialog_apply()
        self.dialog_wait_error("mount_point", "Mount point cannot be empty")
        self.dialog_set_val("mount_point", mount_point_foo)
        self.dialog_apply()
        self.dialog_wait_close()
        self.content_row_wait_in_col(1, 1, "ext4 file system")
        self.content_tab_wait_in_info(1, 1, "Name", "FILESYSTEM")
        self.wait_in_storaged_configuration(mount_point_foo)

        self.content_tab_wait_in_info(1, 1, "Mount point", mount_point_foo)

        self.content_dropdown_action(1, "Unmount")
        self.content_tab_wait_in_info(1, 1, "Mount point", "The filesystem is not mounted")

        self.content_tab_info_action(1, 1, "Name")
        self.dialog({"name": "filesystem"})
        self.content_tab_wait_in_info(1, 1, "Name", "filesystem")

        self.dialog_with_retry(trigger=lambda: self.content_tab_info_action(1, 1, "Mount point", wrapped=True),
                               expect={"mount_point": mount_point_foo},
                               values={"mount_point": mount_point_bar})
        self.wait_in_storaged_configuration(mount_point_bar)
        self.content_tab_wait_in_info(1, 1, "Mount point", mount_point_bar)

        self.content_head_action(1, "Mount")
        self.confirm()
        self.wait_mounted(1, 1)

        # Go to overview page and check that the filesystem usage is
        # displayed correctly.

        def wait_ratio_in_range(sel, low, high):
            b.wait_js_func("""(function (sel, low, high) {
              var text = ph_text(sel);
              var match = text.match('([0-9.]+) / ([0-9]+)');
              if (!match)
                return false;
              var ratio = parseFloat(match[1]) / parseFloat(match[2]);
              return low <= ratio && ratio <= high;
            })""", sel, low, high)

        b.go("#/")
        b.wait_in_text("table[aria-label=Filesystems]", mount_point_bar)
        bar_selector = 'table[aria-label=Filesystems] tr:contains("%s") td:nth-child(3)' % mount_point_bar
        wait_ratio_in_range(bar_selector, 0.0, 0.1)
        m.execute("dd if=/dev/zero of=%s/zero bs=1M count=30 status=none" % mount_point_bar)
        wait_ratio_in_range(bar_selector, 0.5, 1.0)
        m.execute("rm %s/zero" % mount_point_bar)
        wait_ratio_in_range(bar_selector, 0.0, 0.1)
        b.mousedown('table[aria-label=Filesystems] tr:contains("filesystem")')
        b.wait_visible("#storage-detail")

        def wait_info_field_value(name, value):
            return b.wait_text('#detail-header label:contains("%s") + div' % name, value)

        wait_info_field_value("Serial number", "MYDISK")

    def testMountOptions(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        b.click('#drives tr:contains("MYDISK")')
        b.wait_present('#storage-detail')

        # Open format dialog and play with the checkboxes

        self.content_head_action(1, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("crypto.on", True)

        def wait_checked(field):
            b.wait_present(self.dialog_field(field) + ":checked")

        def wait_not_checked(field):
            b.wait_present(self.dialog_field(field) + ":not(:checked)")

        wait_checked("crypto_options.auto")
        wait_checked("mount_options.auto")
        wait_not_checked("crypto_options.ro")
        wait_not_checked("mount_options.ro")

        # Check crypto ro.  This gets propagated to mount ro.
        self.dialog_set_val("crypto_options.ro", True)
        wait_checked("mount_options.ro")

        # Uncheck mount ro.  This gets propagated to crypto ro.
        self.dialog_set_val("mount_options.ro", False)
        wait_not_checked("crypto_options.ro")

        # Uncheck crypto auto and check crypto_ro again to verify that
        # they have the expected effect.
        self.dialog_set_val("crypto_options.auto", False)
        self.dialog_set_val("crypto_options.ro", True)

        # Set extra options.
        self.dialog_set_val("crypto_options.extra", "x-foo")
        self.dialog_set_val("mount_options.extra", "x-foo")

        # Fill in the erst and do the format
        self.dialog_set_val("passphrase", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("passphrase2", "vainu-reku-toma-rolle-kaja")
        self.dialog_set_val("mount_point", "/data")
        self.dialog_apply()
        self.dialog_wait_close()

        self.content_row_wait_in_col(1, 1, "Encrypted data")
        self.content_row_wait_in_col(2, 1, "ext4 file system")
        self.wait_in_storaged_configuration("/data")
        self.wait_mounted(2, 1)

        m.execute("grep 'noauto,readonly,x-foo' /etc/crypttab")
        m.execute("grep 'noauto,ro,x-foo' /etc/fstab")


    def testMountingHelp(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Add a disk
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        b.click('#drives tr:contains("MYDISK")')
        b.wait_present('#storage-detail')

        # Format it

        self.content_head_action(1, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("name", "FILESYSTEM")
        self.dialog_set_val("mount_point", "/run/foo")
        self.dialog_apply()
        self.dialog_wait_close()
        self.content_row_wait_in_col(1, 1, "ext4 file system")
        self.content_tab_wait_in_info(1, 1, "Name", "FILESYSTEM")
        self.content_tab_wait_in_info(1, 1, "Mount point", "/run/foo")

        fsys_tab = self.content_tab_expand(1, 1)

        # Unmount externally, remount with Cockpit

        m.execute("umount /run/foo")
        b.click(fsys_tab + " button:contains(Mount now)")
        b.wait_not_present(fsys_tab + " button:contains(Mount now)")
        self.wait_mounted(1, 1)

        # Unmount externally, adjust fstab with Cockpit

        m.execute("umount /run/foo")
        b.click(fsys_tab + " button:contains(Do not mount automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Do not mount automatically on boot)")

        # Mount externally, unmount with Cockpit

        m.execute("mount /run/foo")
        b.click(fsys_tab + " button:contains(Unmount now)")
        b.wait_not_present(fsys_tab + " button:contains(Unmount now)")

        # Mount externally, adjust fstab with Cockpit

        m.execute("mount /run/foo")
        b.click(fsys_tab + " button:contains(Mount also automatically on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Mount also automatically on boot)")

        # Move mount point externally, move back with Cockpit

        m.execute("umount /run/foo")
        m.execute("mkdir -p /run/bar && mount /dev/sda /run/bar")
        b.click(fsys_tab + " button:contains(Mount on /run/foo now)")
        b.wait_not_present(fsys_tab + " button:contains(Mount on /run/foo now)")

        # Move mount point externally, adjust fstab with Cockpit

        m.execute("umount /run/foo")
        m.execute("mkdir -p /run/bar && mount /dev/sda /run/bar")
        b.click(fsys_tab + " button:contains(Mount automatically on /run/bar on boot)")
        b.wait_not_present(fsys_tab + " button:contains(Mount automatically on /run/bar on boot)")

    def testDuplicateMountPoints(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        # Quickly make two logical volumes
        m.add_disk("50M", serial="MYDISK")
        b.wait_in_text("#drives", "MYDISK")
        m.execute("vgcreate test /dev/sda && lvcreate test -n one -L 20M && lvcreate test -n two -L 20M")
        b.click('#devices tr:contains("test")')
        b.wait_visible("#storage-detail")
        self.content_row_wait_in_col(1, 2, "/dev/test/one")
        self.content_row_wait_in_col(2, 2, "/dev/test/two")

        # Encrypt and format the first and give it /run/data as the mount point
        self.content_head_action(1, "Format")
        self.dialog({"type": "ext4",
                     "crypto.on": "True",
                     "passphrase": "vainu-reku-toma-rolle-kaja",
                     "passphrase2": "vainu-reku-toma-rolle-kaja",
                     "mount_point": "/run/data"})
        self.content_row_wait_in_col(2, 1, "ext4 file system")
        self.content_tab_wait_in_info(2, 1, "Mount point", "/run/data")

        # Format the second and also try to use /run/data as the mount point
        self.content_head_action(3, "Format")
        self.dialog_wait_open()
        self.dialog_set_val("type", "ext4")
        self.dialog_set_val("mount_point", "/run/data")
        self.dialog_apply()
        self.dialog_wait_error("mount_point", "Mount point is already used for /dev/mapper/luks-")
        self.dialog_cancel()
        self.dialog_wait_close()

        # Format the first and re-use /run/data as the mount point.
        # This should be allowed.
        self.content_dropdown_action(1, "Format")
        self.dialog({"type": "ext4",
                     "mount_point": "/run/data"})
        self.content_row_wait_in_col(1, 1, "ext4 file system")
        self.content_tab_wait_in_info(1, 2, "Mount point", "/run/data")

    def testFirstMount(self):
        m = self.machine
        b = self.browser

        self.login_and_go("/storage")

        m.add_disk("50M", serial="MYDISK")
        b.click("#drives tr:contains(MYDISK)")
        b.wait_visible("#storage-detail")

        m.execute("mkfs.ext4 /dev/sda")
        self.content_row_wait_in_col(1, 1, "ext4 file system")

        m.execute("! grep /run/data /etc/fstab")
        self.content_head_action(1, "Mount")
        self.dialog({ "mount_point": "/run/data",
                      "mount_options.extra": "x-foo" })
        m.execute("grep /run/data /etc/fstab")
        m.execute("grep 'x-foo' /etc/fstab")

if __name__ == '__main__':
    test_main()
